package com.katesoft.scale4j.agent;

import java.io.IOException;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import javax.management.MalformedObjectNameException;

import net.jcip.annotations.NotThreadSafe;

import org.springframework.context.Lifecycle;
import org.springframework.core.io.Resource;

import com.katesoft.scale4j.agent.args.AgentOptions;
import com.katesoft.scale4j.agent.exceptions.ExecutionTimeoutException;
import com.katesoft.scale4j.agent.exceptions.FailedToStarAllServicesException;
import com.katesoft.scale4j.agent.service.ServiceDetails;
import com.katesoft.scale4j.agent.service.ServiceStatus;
import com.katesoft.scale4j.common.concurrent.FixedTimeOperation;
import com.katesoft.scale4j.common.io.FileUtility;
import com.katesoft.scale4j.common.lang.RuntimeUtility;
import com.katesoft.scale4j.common.utils.StringUtility;
import com.katesoft.scale4j.log.LogFactory;
import com.katesoft.scale4j.log.Logger;
import com.katesoft.scale4j.rttp.jmx.RttpSupportStopOperationInvoker;
import com.katesoft.xml.jvmcluster.ApplicationConfigurationDocument.ApplicationConfiguration;
import com.katesoft.xml.jvmcluster.LogConfigurationDocument.LogConfiguration;
import com.katesoft.xml.jvmcluster.ServiceDocument.Service;

/**
 * starts user defined services.
 * <p/>
 * handles life cycle of all services and exposes some useful functionality which can be ued for
 * external exposing like JMX.
 * 
 * @author kate2007
 */
@NotThreadSafe
public class ServiceLauncher implements Lifecycle {
   private static final int STOP_SERVICE_TIMEOUT = 15 * 1000;
   private final ExecutorService executor = Executors.newCachedThreadPool();
   private final Logger logger = LogFactory.getLogger(getClass());
   private final ConcurrentMap<String, ServiceDetails> services;
   private final ConcurrentMap<String, Process> processes;
   private final Service[] serviceArray;
   private final ApplicationConfiguration configuration;
   private final AgentOptions options;
   private final Semaphore semaphore;

   public ServiceLauncher(ApplicationConfiguration configuration, AgentOptions options) {
      this.configuration = configuration;
      this.options = options;
      serviceArray = configuration.getServiceArray();
      semaphore = new Semaphore(serviceArray.length);
      services = new ConcurrentHashMap<String, ServiceDetails>(serviceArray.length);
      processes = new ConcurrentHashMap<String, Process>(serviceArray.length);
      for (Service s : serviceArray) {
         services.put(s.getName(), new ServiceDetails(s.toString()));
      }
   }

   /**
    * will try to start all services, if unexpected exception occurred will shutdown all started
    * services.
    */
   @Override
   public void start() {
      Service[] arr = configuration.getServiceArray();
      //
      logger.debug("starting %s services", arr.length);
      for (Service service : arr) {
         startProcess(service);
      }
      try {
         if (options.getTimeout() > 0) {
            Thread.sleep((options.getTimeout() / 2) * 1000);
            boolean b = semaphore.tryAcquire(arr.length, (options.getTimeout() / 2), TimeUnit.SECONDS);
            if (!b) {
               stopAllServices(false);
               throw new ExecutionTimeoutException(String.format(
                        "services %s are not finished execution, timeout = [%s secs]", services.keySet().toString(),
                        options.getTimeout()));
            } else {
               semaphore.release(arr.length);
            }
         } else {
            semaphore.acquire(arr.length);
            semaphore.release(arr.length);
         }
      } catch (InterruptedException e) {
         logger.error(e);
         stopAllServices(false);
         throw new FailedToStarAllServicesException(e);
      }
   }

   @Override
   public void stop() {
      stopAllServices(false);
      executor.shutdown();
   }

   /**
    * modify service status(from starting to running for example).
    * 
    * @param key
    *           service name
    * @param status
    *           new status of process
    */
   protected void changeServiceStatus(String key, ServiceStatus status) {
      logger.info("changing status of service %s %s:=>%s", key, services.get(key).getStatus().name(), status.name());
      services.get(key).setStatus(status);
   }

   /**
    * will pass log4j configuration system parameter if necessary.
    * 
    * @param logConfiguration
    *           xml fragment wrapper.
    * @param processBuilder
    *           JVM process builder.
    */
   protected void setLogConfiguration(LogConfiguration logConfiguration, JavaProcessBuilder processBuilder) {
      if (logConfiguration != null && !StringUtility.isEmpty(logConfiguration.getLocation())) {
         try {
            Resource resource = FileUtility.resolve(logConfiguration.getLocation());
            processBuilder.systemProperty("log4j.configuration", resource.getURL().toExternalForm());
         } catch (IOException e) {
            logger.error("failed to resolve log configuration for location %s", logConfiguration.getLocation());
         }
      }
   }

   /**
    * start service process from XML configuration.
    * 
    * @param service
    *           xml configuration wrapper
    */
   protected void startProcess(Service service) {
      if (services.get(service.getName()).getStatus().isRunning()) {
         throw new IllegalStateException(String.format("service = %s is already running", service.getName()));
      }
      JavaProcessBuilder p1 = new JavaProcessBuilder(configuration, service);
      setLogConfiguration(service.getLogConfiguration(), p1);
      startProcessThread(service.getName(), p1);
   }

   /**
    * launch service in separate JVM.
    * 
    * @param serviceName
    *           unique service's name.
    * @param p
    *           java process builder to be used for JVM creation.
    * @return thread that was used for service launch.
    */
   protected Thread startProcessThread(final String serviceName, final JavaProcessBuilder p) {
      p.classpath(System.getProperty("java.class.path"));
      p.systemProperty(RuntimeUtility.VAR_SERVICE_ID, serviceName);
      Runnable runnable = new Runnable() {
         @Override
         public void run() {
            logger.debug("launching JVM for service[%s]", serviceName);
            try {
               semaphore.acquire();
               Process process = p.launch(System.out, System.err);
               processes.put(serviceName, process);
               changeServiceStatus(serviceName, ServiceStatus.RUNNING);
               process.waitFor();
               logger.info("[process=%s, service=%s] finished execution with exit_code=%s", process, serviceName,
                        process.exitValue());
               if (process.exitValue() != 0) {
                  if (services.get(serviceName).getStatus() != ServiceStatus.TERMINATED) {
                     changeServiceStatus(serviceName, ServiceStatus.FAILED_TO_START);
                     logger.error("[process=%s, service=%s] terminated with bad exit value %s, please check logs",
                              process, serviceName, process.exitValue());
                  }
               } else {
                  logger.info("execution of %s was successful", serviceName);
                  changeServiceStatus(serviceName, ServiceStatus.TERMINATED);
               }
               RuntimeUtility.gc();
            } catch (IOException e) {
               logger.error(e);
            } catch (InterruptedException e) {
               logger.error(e);
               Thread.interrupted();
            } finally {
               semaphore.release();
            }
         }
      };
      Thread serviceThread = new Thread(runnable);
      serviceThread.setDaemon(false);
      serviceThread.setName(String.format("jvmcluster_%s_launcher_thread", serviceName));
      serviceThread.start();
      return serviceThread;
   }

   /** @return true if there are running services. */
   @Override
   public boolean isRunning() {
      return runningCount() > 0;
   }

   /** @return number of running services */
   public int runningCount() {
      return services.size() - semaphore.availablePermits();
   }

   /** @return collection with service details. */
   public ConcurrentMap<String, ServiceDetails> getServices() {
      return services;
   }

   /**
    * shutdown all running services(each service in separate thread)
    * 
    * @param forceKill
    *           do we need to destroy process or need to send shutdown command and wait for shutdown
    *           finish?
    */
   public void stopAllServices(final boolean forceKill) {
      long time = System.currentTimeMillis();
      final CountDownLatch stopCountDownLatch = new CountDownLatch(services.size());
      for (final String s : services.keySet()) {
         Thread stopThread = new Thread(new Runnable() {
            @Override
            public void run() {
               try {
                  stopService(s, forceKill);
               } catch (Exception e) {
                  logger.error(e);
               } finally {
                  stopCountDownLatch.countDown();
               }
            }
         });
         stopThread.setName(String.format("jvmcluster_%s_stop_thread", s));
         stopThread.start();
      }
      try {
         stopCountDownLatch.await();
         logger.debug("all services stopped in [%s] milisecs", (System.currentTimeMillis() - time));
      } catch (InterruptedException e) {
         logger.error(e);
      }
   }

   /**
    * allows you to stop application service by name.
    * <p/>
    * This functionality can be used for JMX exposing.
    * 
    * @param name
    *           service's unique name.
    * @param forceKill
    *           do we need to destroy process or need to send shutdown command and wait for shutdown
    *           finish?
    * @throws Exception
    *            if unable to connect to remote jmx server or if unable to invoke remote operation
    *            or if unable to destroy process
    */
   public void stopService(final String name, boolean forceKill) throws ExecutionException, InterruptedException,
            TimeoutException {
      logger.info("stopping service = %s", name);
      ServiceStatus status = services.get(name).getStatus();
      if (status.isActive()) {
         changeServiceStatus(name, ServiceStatus.TERMINATED);
         Process process = processes.get(name);
         if (process != null) {
            if (forceKill) {
               process.destroy();
            } else {
               try {
                  for (Service service : serviceArray) {
                     if (service.getName().equals(name)) {
                        int jmxPort = service.getJmxPort().intValue();
                        final RttpSupportStopOperationInvoker proxy = new RttpSupportStopOperationInvoker(
                                 String.format("service:jmx:rmi:///jndi/rmi://localhost:%s/jmxrmi", jmxPort));
                        logger.info("calling remote jmx stopService method for service[%s]", name);
                        FixedTimeOperation operation = new FixedTimeOperation(executor, new Callable<Object>() {
                           @Override
                           public Object call() throws IOException, MalformedObjectNameException {
                              if (proxy.invoke()) {
                                 logger.info("service[%s] stopped using jmx stopService method ", name);
                              }
                              return null;
                           }
                        }, STOP_SERVICE_TIMEOUT);
                        try {
                           operation.call();
                        } catch (ExecutionException re) {
                           Throwable cause = re.getCause();
                           logger.error("unable to stop service %s through jmx console due to error=%s", name, cause);
                        }
                        break;
                     }
                  }
               } catch (TimeoutException e) {
                  logger.warn("unable to stop service %s, timeout = %s excised", name, STOP_SERVICE_TIMEOUT);
               } finally {
                  logger.info("calling destroy on %s for service=%s", process, name);
                  process.destroy();
               }
            }
         }
      } else {
         logger.info("service=%s is not active", name);
      }
   }

   /**
    * allows you to start application service by name.
    * <p/>
    * This functionality can be used for JMX exposing.
    * 
    * @param name
    *           service's unique name.
    */
   public void startService(String name) {
      logger.info("manual start called for service = %s", name);
      for (int i = 0; i < configuration.getServiceArray().length; i++) {
         Service service = configuration.getServiceArray(i);
         if (service.getName().equalsIgnoreCase(name)) {
            startProcess(service);
            break;
         }
      }
   }

   /** @return if one of application services failed to start. */
   public boolean isStartupFailed() {
      return !getFailedServices().isEmpty();
   }

   public Collection<ServiceDetails> getFailedServices() {
      Collection<ServiceDetails> failedServices = new LinkedHashSet<ServiceDetails>();
      for (ServiceDetails next : services.values()) {
         if (next.getStatus() != null && next.getStatus().isFailedToStart()) {
            failedServices.add(next);
         }
      }
      return failedServices;
   }

   public synchronized void join() throws InterruptedException {
      semaphore.acquire(services.size());
      semaphore.release(services.size());
   }
}
